package com.jianying.aws;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class testMysql {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("CREATE TABLE user_events (\n" +
                "    id INT,\n" +

                "    user_id INT,\n" +
                "    event_type STRING,\n" +
                "    event_time TIMESTAMP(3)\n" +
                ") WITH (\n" +
                "    'connector' = 'mysql-cdc',\n" +
                "    'hostname' = 'database-mysql-1.cfgi4aw8kw3z.rds.cn-north-1.amazonaws.com.cn',\n" +
                "    'port' = '3306',\n" +
                "    'username' = 'root',\n" +
                "    'password' = '.Ljy12qw',\n" +
                "    'database-name' = 'mytest',\n" +
                "    'table-name' = 'user_events',\n" +
                "    'scan.incremental.snapshot.chunk.key-column' = 'id',\n" +
                "    'server-time-zone' = 'Asia/Shanghai'\n" +
                ");");

        TableResult selectResult = tableEnv.executeSql("select * from user_events");
        selectResult.print();


    }
}
